{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Name\n",
    "\n",
    "Data preparation using Spark on YARN with Cloud Dataproc\n",
    "\n",
    "\n",
    "# Label\n",
    "\n",
    "Cloud Dataproc, GCP, Cloud Storage, Spark, Kubeflow, pipelines, components, YARN\n",
    "\n",
    "\n",
    "# Summary\n",
    "\n",
    "A Kubeflow Pipeline component to prepare data by submitting a Spark job on YARN to Cloud Dataproc.\n",
    "\n",
    "# Details\n",
    "\n",
    "## Intended use\n",
    "\n",
    "Use the component to run an Apache Spark job as one preprocessing step in a Kubeflow Pipeline.\n",
    "\n",
    "## Runtime arguments\n",
    "Argument | Description  | Optional | Data type | Accepted values | Default |\n",
    ":--- | :---------- | :--- | :------- | :------| :------| \n",
    "project_id | The ID of the Google Cloud Platform (GCP) project that the cluster belongs to.|No | GCPProjectID |  |  |\n",
    "region | The Cloud Dataproc region to handle the request. | No  | GCPRegion |  |  |  \n",
    "cluster_name | The name of the cluster to run the job. | No | String |  |  |\n",
    "main_jar_file_uri | The Hadoop Compatible Filesystem (HCFS) URI of the JAR file that contains the main class. | No | GCSPath |  | |\n",
    "main_class | The name of the driver's main class. The JAR file that contains the class must be either in the default CLASSPATH or specified in  `spark_job.jarFileUris`.| No |  |  |  | \n",
    "args | The arguments to pass to the driver. Do not include arguments, such as --conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission.| Yes |  |  | |\n",
    "spark_job | The payload of a [SparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/SparkJob).| Yes |  |  |  |\n",
    "job | The payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs). | Yes |  |  |  |\n",
    "wait_interval | The number of seconds to wait between polling the operation.  |  Yes  | |  | 30 |\n",
    "\n",
    "## Output\n",
    "Name | Description | Type\n",
    ":--- | :---------- | :---\n",
    "job_id | The ID of the created job. | String\n",
    "\n",
    "## Cautions & requirements\n",
    "\n",
    "To use the component, you must:\n",
    "\n",
    "\n",
    "\n",
    "*   Set up a GCP project by following this [guide](https://cloud.google.com/dataproc/docs/guides/setup-project).\n",
    "*   [Create a new cluster](https://cloud.google.com/dataproc/docs/guides/create-cluster).\n",
    "*   The component can authenticate to GCP. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.\n",
    "*   Grant the Kubeflow user service account the role `roles/dataproc.editor` on the project.\n",
    "\n",
    "\n",
    "## Detailed description\n",
    "\n",
    "This component creates a Spark job from [Dataproc submit job REST API](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs/submit).\n",
    "\n",
    "Follow these steps to use the component in a pipeline:\n",
    "\n",
    "\n",
    "\n",
    "1.  Install the Kubeflow Pipeline SDK:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "\n",
    "KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'\n",
    "!pip3 install $KFP_PACKAGE --upgrade"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. Load the component using KFP SDK"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.components as comp\n",
    "\n",
    "dataproc_submit_spark_job_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/dataproc/submit_spark_job/component.yaml')\n",
    "help(dataproc_submit_spark_job_op)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Sample\n",
    "Note: The following sample code works in an IPython notebook or directly in Python code.\n",
    "\n",
    "\n",
    "#### Set up a Dataproc cluster\n",
    "[Create a new Dataproc cluster](https://cloud.google.com/dataproc/docs/guides/create-cluster) (or reuse an existing one) before running the sample code.\n",
    "\n",
    "\n",
    "#### Prepare a Spark job\n",
    "Upload your Spark JAR file to a Cloud Storage bucket. In the sample, we use a JAR file that is preinstalled in the main cluster: `file:///usr/lib/spark/examples/jars/spark-examples.jar`.\n",
    "\n",
    "Here is the [source code of the sample](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java).\n",
    "\n",
    "To package a self-contained Spark application, follow these [instructions](https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications).\n",
    "\n",
    "\n",
    "#### Set sample parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "PROJECT_ID = '<Please put your project ID here>'\n",
    "CLUSTER_NAME = '<Please put your existing cluster name here>'\n",
    "REGION = 'us-central1'\n",
    "SPARK_FILE_URI = 'file:///usr/lib/spark/examples/jars/spark-examples.jar'\n",
    "MAIN_CLASS = 'org.apache.spark.examples.SparkPi'\n",
    "ARGS = ['1000']\n",
    "EXPERIMENT_NAME = 'Dataproc - Submit Spark Job'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Example pipeline that uses the component"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl\n",
    "import json\n",
    "@dsl.pipeline(\n",
    "    name='Dataproc submit Spark job pipeline',\n",
    "    description='Dataproc submit Spark job pipeline'\n",
    ")\n",
    "def dataproc_submit_spark_job_pipeline(\n",
    "    project_id = PROJECT_ID, \n",
    "    region = REGION,\n",
    "    cluster_name = CLUSTER_NAME,\n",
    "    main_jar_file_uri = '',\n",
    "    main_class = MAIN_CLASS,\n",
    "    args = json.dumps(ARGS), \n",
    "    spark_job=json.dumps({ 'jarFileUris': [ SPARK_FILE_URI ] }), \n",
    "    job='{}', \n",
    "    wait_interval='30'\n",
    "):\n",
    "    dataproc_submit_spark_job_op(\n",
    "        project_id=project_id, \n",
    "        region=region, \n",
    "        cluster_name=cluster_name, \n",
    "        main_jar_file_uri=main_jar_file_uri, \n",
    "        main_class=main_class,\n",
    "        args=args, \n",
    "        spark_job=spark_job, \n",
    "        job=job, \n",
    "        wait_interval=wait_interval)\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Compile the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = dataproc_submit_spark_job_pipeline\n",
    "pipeline_filename = pipeline_func.__name__ + '.zip'\n",
    "import kfp.compiler as compiler\n",
    "compiler.Compiler().compile(pipeline_func, pipeline_filename)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Submit the pipeline for execution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Specify pipeline argument values\n",
    "arguments = {}\n",
    "\n",
    "#Get or create an experiment and submit a pipeline run\n",
    "import kfp\n",
    "client = kfp.Client()\n",
    "experiment = client.create_experiment(EXPERIMENT_NAME)\n",
    "\n",
    "#Submit a pipeline run\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## References\n",
    "\n",
    "*   [Component Python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/dataproc/_submit_spark_job.py)\n",
    "*   [Component Docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)\n",
    "*   [Sample notebook](https://github.com/kubeflow/pipelines/blob/master/components/gcp/dataproc/submit_spark_job/sample.ipynb)\n",
    "*   [Dataproc SparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/SparkJob)\n",
    "\n",
    "## License\n",
    "By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}